package com.geek.mqttiot.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/**
 * @ClassName IotMqttSubscriberConfig
 * @Description 消息订阅配置
 * @Author zzm
 * @Data 2020/10/14 12:48
 * @Version 1.0
 */
@Slf4j
@Configuration
public class IotMqttSubscriberConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Autowired
    private MqttPahoClientFactory mqttClientFactory;

    @Bean
    public MessageChannel iotMqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory, mqttConfig.getDefaultTopic());
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(iotMqttInputChannel());
        return adapter;
    }

    /**
     * 消息订阅
     */
    @Bean
    @ServiceActivator(inputChannel = "iotMqttInputChannel")
    public MessageHandler handlerTest() {
        return message -> {
            try {
                String string = message.getPayload().toString();
                System.out.println("接收到消息：" + string);
            } catch (MessagingException ex) {
                log.error(ex.getMessage());
            }
        };
    }
}
